/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.job.reindex.service.impl;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Map;

import javax.annotation.Nonnull;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.unidata.mdm.core.dto.reports.string.FailedSuccessReport;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.service.job.NotificationGenerator;
import org.unidata.mdm.core.util.SecurityUtils;
import org.unidata.mdm.data.dao.StorageDAO;
import org.unidata.mdm.job.reindex.configuration.ReindexJobConfigurationConstants;
import org.unidata.mdm.meta.service.job.ModelJobSupport;
import org.unidata.mdm.meta.type.search.EntityIndexType;
import org.unidata.mdm.meta.type.search.EtalonIndexType;
import org.unidata.mdm.search.context.ComplexSearchRequestContext;
import org.unidata.mdm.search.context.SearchRequestContext;
import org.unidata.mdm.search.dto.ComplexSearchResultDTO;
import org.unidata.mdm.search.exception.SearchApplicationException;
import org.unidata.mdm.search.exception.SearchExceptionIds;
import org.unidata.mdm.search.service.SearchService;
import org.unidata.mdm.system.type.annotation.ModuleRef;
import org.unidata.mdm.system.type.module.Module;
import org.unidata.mdm.system.util.TextUtils;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * Reindex job result listener
 */
@JobScope
public class ReindexDataJobListener extends NotificationGenerator implements ModelJobSupport {
    /**
     * Logger.
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(ReindexDataJobListener.class);
    /**
     * Classifier bits.
     */
    private static final String CLASSIFIER_MODULE_ID = "com.unidata.mdm.classifier.data";

    /**
     * 'was' message
     */
    private static final String WAS = "app.job.reindex.previous.state";

    private static final String RECORDS = "app.job.reindex.records";
    private static final String CLASSIFIERS = "app.job.reindex.classifiers";
    private static final String CLASSIFIED_RECORDS = "app.job.reindex.classified.records";
    /**
     * 'now' message
     */
    private static final String NOW = "app.job.reindex.now";
    /**
     * Empty result
     */
    private static final String EMPTY_RESULT = "app.job.reindex.empty";
    /**
     * Search service
     */
    @Autowired
    private SearchService searchService;
    /**
     * MMS.
     */
    @Autowired
    protected MetaModelService metaModelService;
    /**
     * Unidata data source
     */
    @Autowired
    private StorageDAO dataStorageDAO;
    /**
     * Reindex types.
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_REINDEX_TYPES + "] ?: null}")
    private String reindexTypes;
    /**
     * Suppress standard report, if the job is run as part of another complex process.
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_SKIP_DEFAULT_REPORT + "] ?: false}")
    private boolean skipDefaultReport;
    /**
     * If true, ids will be logged to a table, if some shards fail.
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_WRITE_ID_LOG + "] ?: false}")
    private boolean writeIdLog;
    /**
     * If true, id log should be processed instead of normal data tables.
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_PROCESS_ID_LOG + "] ?: false}")
    private boolean processIdLog;
    /**
     * Job operation id
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_OPERATION_ID + "]}")
    private String operationId;
    /**
     * Default object mapper.
     */
    @Autowired
    protected ObjectMapper objectMapper;

    @ModuleRef(CLASSIFIER_MODULE_ID)
    private Module classiferModule;

    private Triple<Long, Long, Long> prevTotalCount;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        try {

            if (!skipDefaultReport) {
                prevTotalCount = getTotalCount();
            }

            prepareClusterBeforeJob();
            prepareIdLog();
            super.beforeJob(jobExecution);
        } catch (Exception e) {
            LOGGER.warn("Before job caught an exception.", e);
            jobExecution.addFailureException(e);
        }
    }


    /**
     * {@inheritDoc}
     */
    @Override
    public void afterJob(JobExecution jobExecution) {
        try {
            resetClusterAfterJob();
            resetIdLog();
            super.afterJob(jobExecution);
        } catch (Exception e) {
            LOGGER.warn("After job caught an exception.", e);
            jobExecution.addFailureException(e);
        }
    }

    /**
     * {@inheritDoc}
     */
    @Nonnull
    @Override
    protected String getGeneralMessage(JobExecution jobExecution) {

        final String userReportMessage = extractUserReportMessage(jobExecution);
        if (StringUtils.isNotBlank(userReportMessage)) {
            return userReportMessage;
        }

        return super.getGeneralMessage(jobExecution);
    }

    @Nonnull
    @Override
    protected String getAdditionMessage(JobExecution jobExecution) {

        if (skipDefaultReport) {
            return StringUtils.EMPTY;
        }

        final Triple<Long, Long, Long> currentTotalCount = getTotalCount();
        final StringBuilder rb = new StringBuilder();

        // 1. Records
        String report = FailedSuccessReport.builder()
                .setSuccessCount(prevTotalCount.getLeft().intValue())
                .setFailedCount(currentTotalCount.getLeft().intValue())
                .setEmptyMessage(TextUtils.getText(EMPTY_RESULT))
                .setSuccessMessage(TextUtils.getText(WAS))
                .setFailedMessage(TextUtils.getText(NOW))
                .noTrailingSpace(true)
                .noTrailingDot(true)
                .noLineSeparator(true)
                .valuesSeparator(Character.toString(FailedSuccessReport.SEMI_COLON))
                .createFailedSuccessReport()
                .generateReport();
        rb
            .append(FailedSuccessReport.SPACE)
            .append(TextUtils.getText(RECORDS))
            .append(FailedSuccessReport.SPACE)
            .append("(")
            .append(report.charAt(0) == FailedSuccessReport.SPACE ? report.substring(1) : report)
            .append(")")
            .append(StringUtils.LF);

        // 2. Classifiers
        report = FailedSuccessReport.builder()
                .setSuccessCount(prevTotalCount.getMiddle().intValue())
                .setFailedCount(currentTotalCount.getMiddle().intValue())
                .setEmptyMessage(TextUtils.getText(EMPTY_RESULT))
                .setSuccessMessage(TextUtils.getText(WAS))
                .setFailedMessage(TextUtils.getText(NOW))
                .noTrailingSpace(true)
                .noTrailingDot(true)
                .noLineSeparator(true)
                .valuesSeparator(Character.toString(FailedSuccessReport.SEMI_COLON))
                .createFailedSuccessReport()
                .generateReport();
        rb
            .append(FailedSuccessReport.SPACE)
            .append(TextUtils.getText(CLASSIFIERS))
            .append(FailedSuccessReport.SPACE)
            .append("(")
            .append(report.charAt(0) == FailedSuccessReport.SPACE ? report.substring(1) : report)
            .append(")")
            .append(StringUtils.LF);

        // 3. Classified records
        report = FailedSuccessReport.builder()
                .setSuccessCount(prevTotalCount.getRight().intValue())
                .setFailedCount(currentTotalCount.getRight().intValue())
                .setEmptyMessage(TextUtils.getText(EMPTY_RESULT))
                .setSuccessMessage(TextUtils.getText(WAS))
                .setFailedMessage(TextUtils.getText(NOW))
                .noTrailingSpace(true)
                .noTrailingDot(true)
                .noLineSeparator(true)
                .valuesSeparator(Character.toString(FailedSuccessReport.SEMI_COLON))
                .createFailedSuccessReport()
                .generateReport();
        rb
            .append(FailedSuccessReport.SPACE)
            .append(TextUtils.getText(CLASSIFIED_RECORDS))
            .append(FailedSuccessReport.SPACE)
            .append("(")
            .append(report.charAt(0) == FailedSuccessReport.SPACE ? report.substring(1) : report)
            .append(")")
            .append(StringUtils.LF);

        return rb.toString();
    }

    private String extractUserReportMessage(JobExecution jobExecution) {

        final String userReport = jobExecution.getJobParameters().getString(ReindexJobConfigurationConstants.USER_REPORT_PARAM);
        if (StringUtils.isNotBlank(userReport)) {

            final String messageType = jobExecution.getExitStatus().equals(ExitStatus.COMPLETED) ?
                    ReindexJobConfigurationConstants.USER_REPORT_MESSAGE_PARAM :
                    ReindexJobConfigurationConstants.USER_REPORT_FAIL_MESSAGE_PARAM;

            try {
                final Map<?, ?> params = objectMapper.readValue(userReport, Map.class);
                return params.get(messageType).toString();
            } catch (Exception e) {
                LOGGER.error("Cannot create report file due to an exception", e);
            }
        }

        return StringUtils.EMPTY;
    }

    private Triple<Long, Long, Long> getTotalCount() {

        Long totalCount = 0L;
        long totalClassifierDataCount = 0L;
        Long totalClassifiedRecordsCount = 0L;

        for (String type : getEntityList(reindexTypes)) {
            try {
                // 1. Records
                SearchRequestContext sCtx = SearchRequestContext.builder(EtalonIndexType.ETALON, type)
                        .totalCount(true)
                        .countOnly(true)
                        .fetchAll(true)
                        .build();

                SearchRequestContext subRequest = SearchRequestContext.builder(EntityIndexType.RECORD, type, SecurityUtils.getCurrentUserStorageId())
                        .totalCount(true)
                        .countOnly(true)
                        .fetchAll(true)
                        .runExits(false)
                        .build();

                ComplexSearchRequestContext ctx = ComplexSearchRequestContext.hierarchical(sCtx, subRequest);

                ComplexSearchResultDTO result = searchService.search(ctx);
                long count = result != null ? result.getMain().getTotalCount() : 0L;

                totalCount += count;

                // 2. Classifiers
                /*
                if (CollectionUtils.isNotEmpty(metaModelService.getClassifiersForEntity(type))) {

                    SearchRequestContextBuilder sCtxb = SearchRequestContext.builder(ClassifierDataIndexType.CLASSIFIER, type)
                        .totalCount(true)
                        .countOnly(true)
                        .onlyQuery(true)
                        .fetchAll(true);

                    result = searchService.search(sCtxb.build());
                    totalClassifierDataCount += result.getTotalCount();

                    sCtxb
                        .countOnly(false)
                        .count(0)
                        .aggregations(Collections.singletonList(
                            CardinalityAggregationRequestContext.builder()
                                .entity(type)
                                .name("total_classified_records")
                                .path(ClassifierDataHeaderField.FIELD_ETALON_ID_RECORD.getName())
                                .build()
                    ));

                    result = searchService.search(sCtxb.build());
                    AggregationResultDTO cr = result.getAggregates()
                        .stream()
                        .filter(ar -> "total_classified_records".equals(ar.getAggregationName()))
                        .findFirst()
                        .orElse(null);

                    if (Objects.nonNull(cr)) {
                        totalClassifiedRecordsCount += cr.getCountMap().get(ClassifierDataHeaderField.FIELD_ETALON_ID_RECORD.getName());
                    }
                }
                */
                // 3. Relations TODO
                // UN-9281
            } catch (SearchApplicationException e) {
                if (e.getId() == SearchExceptionIds.EX_SEARCH_ES_NO_MAPPING_FOUND) {
                    LOGGER.info("Index doesn't exist for entity {}.", type);
                } else {
                    LOGGER.warn("Search exception caught while processing type {}.", type, e);
                }
            }
        }

        return new ImmutableTriple<>(totalCount, totalClassifierDataCount, totalClassifiedRecordsCount);
    }

    /**
     * Do before mass indexing.
     */
    private void prepareClusterBeforeJob() {
        LOGGER.info("Setting bulk-optimized cluster options.");
        /*
         * ("indices.memory.index_buffer_size", "40%"); // Increase indexing buffer size. Cannot be done dynamically.
         */
    }

    private void prepareIdLog() {

        if (!writeIdLog) {
            return;
        }

        try (Connection connection = dataStorageDAO.getBareConnection();
             Statement statement = connection.createStatement()) {

            String tableName = "reindex_id_log_" + StringUtils.replace(operationId, "-", "_");
            statement.executeUpdate("create unlogged table if not exists "
                    + tableName
                    + " (id bigserial primary key, etalon_id uuid, name text, operation_id varchar(256), ts timestamptz)");

        } catch (SQLException e) {
            LOGGER.error("Cannot create id log table, SQLE caught.", e);
        }
    }

    private void resetIdLog() {

        if (!writeIdLog && !processIdLog) {
            return;
        }

        String tableName = "reindex_id_log_" + StringUtils.replace(operationId, "-", "_");
        try (Connection connection = dataStorageDAO.getBareConnection();
             Statement statement = connection.createStatement();
             ResultSet result = statement.executeQuery("select coalesce((select false from " + tableName + " fetch first 1 rows only), true)")) {

           boolean isEmpty = result.next() && result.getBoolean(1);
           if (isEmpty) {
               statement.executeUpdate("drop table if exists " + tableName);
           }

       } catch (SQLException e) {
           LOGGER.error("Cannot create id log table, SQLE caught.", e);
       }
    }
    /**
     * Do after mass indexing.
     */
    private void resetClusterAfterJob() {
        LOGGER.info("Setting default cluster options.");
        /*
         * ("indices.memory.index_buffer_size", "10%"); // Decrease indexing buffer size. Cannot be done dynamically.
         */
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public MetaModelService metaModelService() {
        return metaModelService;
    }
}